package com.galeno.练习

import ch.hsr.geohash.GeoHash
import com.alibaba.fastjson.JSON
import com.galeno.utils.SparkUtil
import org.apache.spark.rdd.RDD

import java.sql.{DriverManager, ResultSet}

/**
 * @Title: ${file_name}
 * @Description: ${todo}
 * @author galeno
 * @date 2021/9/220:59
 */
object GeoHashDemo2 {
  def main(args: Array[String]): Unit = {
    val sc = SparkUtil.getSc
    //加载日志文件
    val logRdd: RDD[String] = sc.textFile("data/app_log_2021-06-05.log")

    //继承省市区
    val areaAdded: RDD[String] = logRdd.mapPartitions(iter => {
      //建立数据库连接
      val conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/spark", "root", "root")
      val stmt = conn.prepareStatement("select province,city,region  from ref_geohash where geohash=?")


      //迭代日志,逐条查询
      iter.map(str => {
        //json解析Obj
        val jSONObject = JSON.parseObject(str)
        val lat = jSONObject.getDouble("latitude")
        val lng = jSONObject.getDouble("longitude")
        val geohash = GeoHash.geoHashStringWithCharacterPrecision(lat, lng, 6)
        stmt.setString(1,geohash)
        val rs: ResultSet = stmt.executeQuery()
        //为省市区赋值
        var Array(province, city, region) = Array("", "", "")
        if (rs.next()) {
          val province = rs.getString("province")
          val city = rs.getString("city")
          val region = rs.getString("region")
        }
        //添加到json中
        jSONObject.put("province", province)
        jSONObject.put("city", city)
        jSONObject.put("region", region)

        jSONObject.toJSONString


      })


    })
    areaAdded.saveAsTextFile("dataout/aread/")
    sc.stop()

  }
}
